A Publish-Subscribe (Pub-Sub) system is a messaging pattern where publishers send messages to topics without knowing who will receive them, and subscribers receive messages by subscribing to those topics.
This decouples senders and receivers, making it easier to build scalable and flexible systems. Popular examples include Kafka, Redis Pub/Sub, and Google Cloud Pub/Sub.
In this chapter, we will explore the low-level design of a simple in-memory pub-sub system.
Lets start by clarifying the requirements:
Before starting the design, it's important to ask thoughtful questions to uncover hidden assumptions and better define the scope of the system.
Here is an example of how a conversation between the candidate and the interviewer might unfold:
Candidate: Should the system support multiple publishers and subscribers for a single topic?
Interviewer: Yes, each topic can have multiple publishers and multiple subscribers. For this design, the publisher can simply be the client / demo using it.
Candidate: Should message delivery be synchronous or asynchronous?
Interviewer: It should be asynchronous. When a publisher sends a message, it should not wait for subscribers to consume it.
Candidate: Do we need to guarantee message ordering for subscribers?
Interviewer: Yes, messages within a topic should be delivered to each subscriber in the order they were published.
Candidate: Should we support different delivery semantics like exactly-once or at-least-once delivery?
Interviewer: For this design, let’s keep it simple and go with a "fire-and-forget" model. That means best-effort delivery without retries, acknowledgments, or delivery guarantees.
Candidate: Can subscribers unsubscribe at any point during runtime?
Interviewer: Yes, subscribers should be able to dynamically subscribe and unsubscribe from topics at any time.
After gathering the details, we can summarize the key system requirements.
After the requirements are clear, lets identify the core entities/objects we will have in our system.
Core entities are the fundamental building blocks of our system. We identify them by analyzing the functional requirements and highlighting the key nouns and responsibilities that naturally map to object-oriented abstractions such as classes, interfaces, or enums.
Let’s walk through the functional requirements and extract the relevant entities:
This suggests the need for a Topic entity, which acts as a named communication channel. Each topic maintains a list of subscribers and is responsible for dispatching published messages to them in order.
This introduces the need for a Message entity to represent each published unit of communication.
For this design, we can treat the client or demo class as the publisher. There is no need for a separate Publisher class unless extended functionality is required.
A central PubSubService entity will serve as the orchestrator of the system. It will handle topic creation, manage subscriptions, accept published messages and route them to appropriate topic for distribution.
This indicates the need for a Subscriber entity, which registers interest in specific topics and receives messages from them.
Topic: Represents a named communication channel. Manages a list of subscribers and distributes messages to them asynchronously.Message: Represents a single unit of data published to a topic. Contains metadata such as payload, timestamp, and possibly a unique identifier.Subscriber: Represents an entity that listens to messages from one or more topics. Can subscribe/unsubscribe at runtime.PubSubService: The central coordinator. Provides APIs for creating topics, managing subscriptions, and routing published messages to the appropriate topic.These core entities define the key abstractions of a Pub-Sub system and will guide the structure of our low-level design and class diagrams.
This section details the classes, their relationships, and the design patterns used to structure the Pub-Sub system.
The system is built around a few core classes and interfaces that manage topics, messages, and subscribers.
MessageA simple, immutable Data Transfer Object (DTO) that represents the data being transmitted.
It contains a String payload and an Instant timestamp. Its immutability makes it inherently thread-safe.
Subscriber (Interface)Defines the contract for any class that wishes to receive messages.
It declares two methods: getId() to uniquely identify the subscriber and onMessage(Message message) which is the callback method invoked when a message is delivered.
AlertSubscriber & NewsSubscriber
These are concrete implementations of the Subscriber interface. They demonstrate how different types of subscribers can handle the same message in unique ways, promoting system flexibility.
TopicRepresents a distinct message channel.
It maintains a thread-safe set of Subscriber objects and is responsible for broadcasting messages to them. Message delivery is handled asynchronously using an ExecutorService to avoid blocking the publisher.
PubSubServiceThe central entry point and control hub for the entire system.
It manages the lifecycle of topics and provides a clean API for clients to publish messages and manage subscriptions. It is implemented as a Singleton to ensure a single point of control.
The relationships between classes define the system's architecture, ensuring loose coupling and high cohesion.
A strong "has-a" relationship where one object owns another.
PubSubService has Topics: The PubSubService creates and manages a Map<String, Topic>. The lifecycle of a Topic is controlled entirely by the PubSubService.Topic has Subscribers: A Topic manages a Set<Subscriber> for its channel. It controls which subscribers are associated with it.A "has-a" relationship where objects are related but have independent lifecycles.
PubSubService has an ExecutorService: The service creates and owns a thread pool for message delivery.Topic is associated with an ExecutorService: Each topic is given a reference to the shared ExecutorService from PubSubService to perform its broadcasting tasks.An "is-a" relationship based on an interface contract.
AlertSubscriber and NewsSubscriber are Subscribers. They both implement the Subscriber interface, providing concrete logic for the onMessage() method.A "uses-a" relationship where one class depends on another to perform its function.
PubSubService, Topic, and Subscriber classes all depend on the Message class to publish, broadcast, and receive data, respectively.Several design patterns are employed to create a robust, scalable, and maintainable system.
This is the foundational pattern of the system.
Topic class acts as the subject. It maintains a list of observers and notifies them of state changes (new messages).Subscriber interface acts as the observer. Concrete subscribers (NewsSubscriber, AlertSubscriber) register with a Topic to receive updates.PubSubService.publish() is called, the corresponding Topic's broadcast() method iterates through its Subscriber list and calls the onMessage() method on each one, decoupling the Topic from the concrete Subscriber implementations.The Subscriber interface can be viewed as a strategy interface. It defines an algorithm (onMessage). Concrete implementations (AlertSubscriber, NewsSubscriber) provide different strategies for how a message should be processed. The Topic is configured with a set of these strategies (its subscribers) and applies them when a message is broadcast.
The PubSubService serves as a facade. It provides a simplified, high-level interface (createTopic, subscribe, publish) to the client, hiding the more complex underlying components and interactions, such as topic creation, subscriber registration, and asynchronous message delivery.
The PubSubService is implemented as a singleton. This ensures there is only one instance managing all topics and the shared thread pool, providing a single, globally accessible point of control for the entire pub-sub mechanism.
This class is a simple, immutable Data Transfer Object (DTO) that represents the content being sent through the system.
1class Message:
2 def __init__(self, payload: str):
3 self.payload = payload
4 self.timestamp = datetime.now()
5
6 def get_payload(self) -> str:
7 return self.payload
8
9 def __str__(self) -> str:
10 return f"Message{{payload='{self.payload}'}}"Each Message includes:
payload: the actual message text.timestamp: marks when the message was created.The payload and timestamp fields are marked as final, making Message objects immutable.
The Subscriber interface defines the contract for any object that wishes to receive messages. This follows the Observer design pattern, allowing the system to be decoupled from the concrete classes that consume messages.
1class Subscriber(ABC):
2 @abstractmethod
3 def get_id(self) -> str:
4 pass
5
6 @abstractmethod
7 def on_message(self, message: Message):
8 pass
9
10class AlertSubscriber(Subscriber):
11 def __init__(self, subscriber_id: str):
12 self.id = subscriber_id
13
14 def get_id(self) -> str:
15 return self.id
16
17 def on_message(self, message: Message):
18 print(f"!!! [ALERT - {self.id}] : '{message.get_payload()}' !!!")
19
20class NewsSubscriber(Subscriber):
21 def __init__(self, subscriber_id: str):
22 self.id = subscriber_id
23
24 def get_id(self) -> str:
25 return self.id
26
27 def on_message(self, message: Message):
28 print(f"[Subscriber {self.id}] received message '{message.get_payload()}'")The onMessage() method is called when a message is delivered.
Two different subscriber types simulate real-world use cases:
AlertSubscriber: Reacts to critical alerts.NewsSubscriber: Receives general news.Each implementation customizes the display behavior of received messages.
A Topic represents a distinct channel for messages. It maintains its own list of subscribers and is responsible for broadcasting messages to them.
1class Topic:
2 def __init__(self, name: str, delivery_executor: ThreadPoolExecutor):
3 self.name = name
4 self.delivery_executor = delivery_executor
5 self.subscribers: Set[Subscriber] = set()
6
7 def get_name(self) -> str:
8 return self.name
9
10 def add_subscriber(self, subscriber: Subscriber):
11 self.subscribers.add(subscriber)
12
13 def remove_subscriber(self, subscriber: Subscriber):
14 self.subscribers.discard(subscriber)
15
16 def broadcast(self, message: Message):
17 for subscriber in self.subscribers:
18 self.delivery_executor.submit(self._deliver_message, subscriber, message)
19
20 def _deliver_message(self, subscriber: Subscriber, message: Message):
21 try:
22 subscriber.on_message(message)
23 except Exception as e:
24 print(f"Error delivering message to subscriber {subscriber.get_id()}: {str(e)}")Represents a channel to which messages are published and from which subscribers receive messages.
subscribers: A thread-safe set of subscribers.broadcast(): Asynchronously delivers a message to all subscribers using a thread pool.This is the main facade for the system. It manages the lifecycle of topics and provides a centralized API for clients to interact with the system.
1class PubSubService:
2 _instance = None
3 _lock = threading.Lock()
4
5 def __new__(cls):
6 if cls._instance is None:
7 with cls._lock:
8 if cls._instance is None:
9 cls._instance = super().__new__(cls)
10 cls._instance._initialized = False
11 return cls._instance
12
13 def __init__(self):
14 if self._initialized:
15 return
16
17 self.topic_registry: Dict[str, Topic] = {}
18 # A cached thread pool is suitable for handling many short-lived, bursty tasks (message deliveries).
19 self.delivery_executor = ThreadPoolExecutor()
20 self._initialized = True
21
22 @classmethod
23 def get_instance(cls):
24 return cls()
25
26 def create_topic(self, topic_name: str):
27 if topic_name not in self.topic_registry:
28 self.topic_registry[topic_name] = Topic(topic_name, self.delivery_executor)
29 print(f"Topic {topic_name} created")
30
31 def subscribe(self, topic_name: str, subscriber: Subscriber):
32 topic = self.topic_registry.get(topic_name)
33 if topic is None:
34 raise ValueError(f"Topic not found: {topic_name}")
35 topic.add_subscriber(subscriber)
36 print(f"Subscriber '{subscriber.get_id()}' subscribed to topic: {topic_name}")
37
38 def unsubscribe(self, topic_name: str, subscriber: Subscriber):
39 topic = self.topic_registry.get(topic_name)
40 if topic is not None:
41 topic.remove_subscriber(subscriber)
42 print(f"Subscriber '{subscriber.get_id()}' unsubscribed from topic: {topic_name}")
43
44 def publish(self, topic_name: str, message: Message):
45 print(f"Publishing message to topic: {topic_name}")
46 topic = self.topic_registry.get(topic_name)
47 if topic is None:
48 raise ValueError(f"Topic not found: {topic_name}")
49 topic.broadcast(message)
50
51 def shutdown(self):
52 print("PubSubService shutting down...")
53 self.delivery_executor.shutdown(wait=True)
54 print("PubSubService shutdown complete.")createTopic(): Creates a new topic if it doesn't already exist.subscribe() / unsubscribe(): Manage subscriber enrollment.publish(): Triggers broadcast of a message to all subscribers of a topic.The LoggingFrameworkDemo class demonstrates how a client would use the PubSubService to create topics, manage subscriptions, and publish messages.
1class PubSubDemo:
2 @staticmethod
3 def main():
4 pub_sub_service = PubSubService.get_instance()
5
6 # --- Create Subscribers ---
7 sports_fan1 = NewsSubscriber("SportsFan1")
8 sports_fan2 = NewsSubscriber("SportsFan2")
9 techie1 = NewsSubscriber("Techie1")
10 all_news_reader = NewsSubscriber("AllNewsReader")
11 system_admin = AlertSubscriber("SystemAdmin")
12
13 # --- Create Topics and Subscriptions ---
14 SPORTS_TOPIC = "SPORTS"
15 TECH_TOPIC = "TECH"
16 WEATHER_TOPIC = "WEATHER"
17
18 pub_sub_service.create_topic(SPORTS_TOPIC)
19 pub_sub_service.create_topic(TECH_TOPIC)
20 pub_sub_service.create_topic(WEATHER_TOPIC)
21
22 pub_sub_service.subscribe(SPORTS_TOPIC, sports_fan1)
23 pub_sub_service.subscribe(SPORTS_TOPIC, sports_fan2)
24 pub_sub_service.subscribe(SPORTS_TOPIC, all_news_reader)
25 pub_sub_service.subscribe(SPORTS_TOPIC, system_admin)
26
27 pub_sub_service.subscribe(TECH_TOPIC, techie1)
28 pub_sub_service.subscribe(TECH_TOPIC, all_news_reader)
29
30 print("\n--- Publishing Messages ---")
31
32 # --- Publish to SPORTS topic ---
33 pub_sub_service.publish(SPORTS_TOPIC, Message("Team A wins the championship!"))
34 # Expected: SportsFan1, SportsFan2, AllNewsReader, SystemAdmin receive this.
35
36 # --- Publish to TECH topic ---
37 pub_sub_service.publish(TECH_TOPIC, Message("New AI model released."))
38 # Expected: Techie1, AllNewsReader receive this.
39
40 # --- Publish to WEATHER topic (no subscribers) ---
41 pub_sub_service.publish(WEATHER_TOPIC, Message("Sunny with a high of 75°F."))
42 # Expected: Message is dropped.
43
44 # Allow some time for async messages to be processed
45 time.sleep(0.5)
46
47 print("\n--- Unsubscribing a user and re-publishing ---")
48
49 # SportsFan2 gets tired of sports news
50 pub_sub_service.unsubscribe(SPORTS_TOPIC, sports_fan2)
51
52 # Publish another message to SPORTS
53 pub_sub_service.publish(SPORTS_TOPIC, Message("Major player traded to Team B."))
54 # Expected: SportsFan1, AllNewsReader, SystemAdmin receive this. SportsFan2 does NOT.
55
56 # Give messages time to be delivered
57 time.sleep(0.5)
58
59 # --- Shutdown the service ---
60 pub_sub_service.shutdown()
61
62if __name__ == "__main__":
63 PubSubDemo.main()In a Pub-Sub system, which entity is responsible for managing subscribers and delivering messages to them?
Why we havent created a class for publishers? As only registered publishers should be allowed to publish message right